Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support streaming downloads #63

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open

Support streaming downloads #63

wants to merge 10 commits into from

Conversation

ivgiuliani
Copy link
Contributor

@ivgiuliani ivgiuliani commented Apr 11, 2023

This adds support for a streaming interface for uploads a downloads, though only downloads are supported at the moment.

Requirements considered:

  • being able to easily access the content of the files in memory as we download it: this is so we can do processing of the content as the file gets downloaded
  • keep the interface similar to the existing (not stream-based) download/upload, whilst making it easy to iterate on individual chunks: this is to reduce cognitive load on the library

Both GCS and S3 do not natively support streaming[1], so streaming is emulated using partial range downloads. The streaming interface is meant to be as compatible as possible with the non-streaming version:

Non-streamed download

This is the existing interface, no change there. It returns a hash with :bucket, :key and :content.

BucketStore.for("inmemory://bucket/file").download
=> {:bucket=>"bucket", :key=>"file", :content=>...}

Streamed download

The streaming version introduces a .stream for .download/.upload methods. .download takes an optional chunk_size argument to control the size of the chunks (with 4Mb being the default otherwise). .upload is not implemented as part of this PR, but .download returns an enumerable object where each object returned is a lazily downloaded chunk of at most chunk_size. The major difference in this case is that the content is returned outside of the main hash, as this allows to iterate more easily on individual chunks:

BucketStore.for("inmemory://bucket/file").stream.download
=> #<Enumerator: ...>

BucketStore.for("inmemory://bucket/file").stream.download.map { |item| item }
=> [[{:bucket=>"bucket", :key=>"file"}, "chunk 1"], [{:bucket=>"bucket", :key=>"file"}, "chunk 2"], ...]

BucketStore.for("inmemory://bucket/file").stream.download.map { |_, chunk| chunk }
=> ["chunk 1", "chunk 2", ...]

[1] sort of. S3's sdk does, in fact, support streaming downloads. Moreso, there's two ways it does this, but neither of them works well enough for our use cases. The first way is to stream directly to a file-like object (e.g. StringIO), however due to how this is implemented we can't access the content of the file in-memory as we download it. The second way is to use blocks, however when using blocks to download objects, the Ruby sdk will NOT retry failed requests after the first chunk of data has been yielded. Doing so could cause file corruption on the client end by starting over mid-stream. See https://aws.amazon.com/blogs/developer/downloading-objects-from-amazon-s3-using-the-aws-sdk-for-ruby/ for details.

@ivgiuliani ivgiuliani force-pushed the stream-downloads branch 3 times, most recently from dc6cdfe to 8d500d1 Compare April 12, 2023 08:51
Make sure integration tests always start with a clean slate, regardless
of how the previous test run has ended. Note that this can also fail,
but it's a good test in itself...
This introduces a new `.stream` interface for upload and download
operations. Currently only downloads are supported, and as part of this
commit there's only a reference implementation for the in-memory
adapter.

Requirements considered:
- being able to easily access the content of the files in memory as we
  download it: this is so we can do processing of the content as the file
  gets downloaded
- keep the interface as similar as possible to the existing (not
  stream-based) download/upload: this is to reduce cognitive load on the
  library, as the main change between stream/not-stream is to add the
  .stream keyword and iterate on blocks.
Adds support for streaming reads/downloads in the disk adapter.
@ivgiuliani ivgiuliani force-pushed the stream-downloads branch 2 times, most recently from e0866d8 to a204732 Compare April 12, 2023 09:56
Adds support for streaming reads/downloads in the GCS adapter. Note that
GCS does _not_ natively support streaming downloads, so we emulate
streaming using partial range downloads.
S3's Ruby SDK supports streaming downloads in two ways:
- by streaming directly into a file-like object (e.g. `StringIO`).
  However due to how this is implemented, we can't access the content
  of the file in-memory as it gets downloaded. This is problematic as
  it's one of our requirements, since we may want to do file processing
  as we download it.
- by using block-based iteration. This has the big drawback that it will
  _not_ retry failed requests after the first chunk of data has been
  yielded, which could lead to file corruption on the client end by
  starting over emid-stream.

(https://aws.amazon.com/blogs/developer/downloading-objects-from-amazon-s3-using-the-aws-sdk-for-ruby/
for more details)

Therefore streaming downloads are implemented similarly to the GCS
adapter by leveraging partial range downloads. This adds an extra HTTP
call to AWS to obtain the overall file size, but is a more robust
solution as it both supports retries of individual chunks, but also
allows us to inspect the content of the file as we download it.
This also tests that it's possible to customise the chunk size on
individual adapters.
@ivgiuliani ivgiuliani marked this pull request as ready for review April 13, 2023 15:59
Comment on lines +61 to +62
def stream_download(bucket:, key:, chunk_size: nil)
chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def stream_download(bucket:, key:, chunk_size: nil)
chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES
def stream_download(bucket:, key:, chunk_size: DEFAULT_STREAM_CHUNK_SIZE_BYTES)

minor / nitpick, just curious why we're not using the default arg functionality

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a PEBKAC...

ahjmorton pushed a commit that referenced this pull request Apr 18, 2023
The interface for this will change to add a `stream` version, similar to #63.

To prepare for that we simplify the `upload` method
ahjmorton pushed a commit that referenced this pull request Apr 18, 2023
Completely stolen from #63.

The KeyStreamer expects uploads / downloads to be expressed in terms of IO operations.

For example a download is actually `download into this IO` and upload is `upload the content from this file`.

Uploads / downloads of Strings are considered a special case of this. However is distinct enough that we have streaming be it's own API.
ahjmorton pushed a commit that referenced this pull request Apr 18, 2023
Completely stolen from #63.

The KeyStreamer expects uploads / downloads to be expressed in terms of IO operations.

For example a download is actually `download into this IO` and upload is `upload the content from this file`.

Uploads / downloads of Strings are considered a special case of this. However is distinct enough that we have streaming be it's own API.
ahjmorton pushed a commit that referenced this pull request Apr 18, 2023
Completely stolen from #63.

The KeyStreamer expects uploads / downloads to be expressed in terms of IO operations.

For example a download is actually `download into this IO` and upload is `upload the content from this file`.

Uploads / downloads of Strings are considered a special case of this. However is distinct enough that we have streaming be it's own API.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants